//import java.text.SimpleDateFormat
//
//import org.apache.flink.api.java.tuple.Tuple
//import org.apache.flink.streaming.api.TimeCharacteristic
//import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
//import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//import org.apache.flink.streaming.api.watermark.Watermark
//import org.apache.flink.api.scala._
//import org.apache.flink.streaming.api.windowing.time.Time
//import org.apache.flink.streaming.api.windowing.windows.TimeWindow
//import org.apache.flink.util.Collector
//
//object PunctuatedWatermarks {
//  def main(args: Array[String]): Unit = {
//
//    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//引入EventTime
//    val dataDS = env.socketTextStream("Desktop", 3456)
//
//
////    给数据配置水位线
//    val tsDS = dataDS.map(str => {
//      val strings = str.split(",")
//      (strings(0), strings(1).toLong, 1)
//    }).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[(String,Long,Int)]
//    {
//
////      返回的数据类型是一个Watermark
//        override def checkAndGetNextWatermark(lastElement: (String, Long, Int), extractedTimestamp: Long): Watermark = {
//          if (lastElement._1 .contains("later"))
//          {
//            println("间歇性生成了水位线.....")
//                                             // 间歇性生成水位线数据
//            new Watermark(extractedTimestamp)//这个的extractedTimestamp这个只是个变量，不要被吓唬住
//          }
//          return null
//        }
//
//        override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = {
//          element._2 * 1000L//对应上面引入的EventTime
//        }
//      }
//    )
//
//    val result = tsDS
//      .keyBy(0)
//      .timeWindow(Time.seconds(5))
//      .apply {
//        (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => {
//          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//          out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(",")} }")//????????
//        }
//      }
//    tsDS.print("water")
////    result.print("calc")
//
//    env.execute()
//  }
//}